跳到主要内容

Canal 同步数据库

什么是 canal

阿里开源的框架 Canal,可以简单地把 canal 理解为一个用来同步增量数据的一个工具。

Canal 可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。当数据库发生增删改的时候,会产生一个日志文件,Canal通过读取日志文件,知道哪些数据发生了变化。在这里,我们将更新的数据给到 Canal 微服务中,然后微服务把数据写到 Redis 里。

canal 的工作原理就是把自己伪装成 MySQL slave,模拟 MySQL slave 的交互协议向 MySQL Mater 发送 dump 协议,MySQL mater 收到 canal 发送过来的 dump 请求,开始推送 binary log 给c anal,然后 canal 解析 binary log,再发送到存储目的地,比如 MySQL,Kafka,Elastic Search 等等。

但是 canal 的数据同步不是全量的,而是增量。基于 binary log 增量订阅和消费,canal 可以做:

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护
  • 业务 cache(缓存)刷新
  • 带业务逻辑的增量数据处理

配置 MySQL

参考官方文档 https://github.com/alibaba/canal/wiki/QuickStart

首先进到配置文件开启这个 binlog 模式

# 进到里面可以发现它包含了其它的配置文件
# !includedir /etc/mysql/conf.d/ # 注意,这个 / 结尾的是目录
# !includedir /etc/mysql/mysql.conf.d/

# 进到这个 mysql.conf.d/ 目录里面的配置文件可以开启这个 binlog
vim /etc/mysql/mysql.conf.d/mysqld.cnf

如果找不到这个 文件可以用命令:

mysql --help | grep my.cnf
# 注意配置文件是 my.cnf 别写错了

注意:这里的 binlog-format=ROW 表示选择ROW(行)模式,且 server_id 不要和 canal 的 slaveId 重复

[mysqld]
log-bin=mysql-bin # 注意不要写 log_bin_basename=/var/lib/mysql/mysql-bin 那些,那是旧版的配置方式
binlog_format=ROW
server_id=12345 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启:

service mysql restart

查看是否开启 binlog

show variables like'log_%';

SHOW VARIABLES LIKE '%bin%';

然后在 MySQL 中需要创建一个用户,并授权:

-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:canal
create user 'canal'@'%' identified by 'canal';
-- 授权 *.*表示所有库
-- REPLICATION SLAVE 主从复制权限
-- REPLICATION CLIENT 主从复制客户端权限
-- grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

-- 检查权限
select *
from mysql.user
where user = 'canal';

Docker 安装 Canal

FIXME: 这个方法有问题,就按照官网那种手动启动的好点

下载镜像:

docker pull docker.io/canal/canal-server:v1.1.5

容器安装

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server  #11111:11111:端口映射

安装完成之后还需要进行配置

docker exec -it canal /bin/bash     # 进入到canal容器中
cd canal-server/conf # 切换到配置文件所在的目录
vi canal-server/conf/canal.properties
vi canal-server/conf/example/instance.properties

进入到 canal.properties 中看看,里面配置了 Canal 的 id,端口等信息

再来看看 instance.properties,这个文件配置了数据库相关的信息

配置完成后,设置开机启动,并记得重启 canal。

docker update --restart=always canal
docker restart canal

整合到 Spring

添加依赖

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>

<!--1.1.5更新后要导入这个包-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>

编写监听器

package com.alsritter.service.search.config;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

/**
* 配置 Canal 客户端
*
* @author alsritter
* @version 1.0
**/
@Slf4j
@Component
public class CustomCanalClient implements InitializingBean {
private static final int BATCH_SIZE = 1000;

@Override
public void afterPropertiesSet() throws Exception {
// 创建链接
// 这里填写 canal 所配置的服务器 ip,端口号,destination (在 canal.properties 文件里)以及服务器账号密码
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), "example", "", "");

try {
//打开连接
connector.connect();
//订阅数据库表,全部表
connector.subscribe(".*..*");
//回滚到未进行 ack 的地方,下次 fetch 的时候,可以从最后一个没有 ack 的地方开始拿
connector.rollback();

while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
//获取批量ID
long batchId = message.getId();
//获取批量的数量
int size = message.getEntries().size();
//如果没有数据
if (batchId == -1 || size == 0) {
try {
//线程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//如果有数据,处理数据
printEntry(message.getEntries());
}
//进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}


/**
* 打印 canal server 解析 binlog 获得的实体类信息
*/
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
//开启/关闭事务的实体类型,跳过
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
//获取操作类型:insert/update/delete类型
EventType eventType = rowChage.getEventType();
//打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

//判断是否是DDL语句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
//获取RowChange对象里的每一行数据,打印出来
for (RowData rowData : rowChage.getRowDatasList()) {
//如果是删除语句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
//如果是新增语句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
//如果是更新的语句
} else {
//变更前的数据
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
//变更后的数据
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

测试是否正常工作

随便修改点数据

检查控制台,可以发现就可以监听到数据变化了

常见错误

1、服务端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example 是由于你改了配置文件,导致meta.dat 中保存的位点信息和数据库的位点信息不一致;导致canal抓取不到数据库的动作; 解决方法:删除 meta.dat 删除,再重启canal,问题解决;

2、客户端:java.lang.OutOfMemoryError: Java heap space canal消费端挂了太久,在zk对应conf下节点的 /otter/canal/destinations/test_db/1001/cursor 位点信息是很早以前,导致重启canal时,从很早以前的位点开始消费,导致canal服务器内存爆掉

3、服务端ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x0191aafd, /192.168.10.68:49502 => /192.168.10.68:11111], exception=java.io.IOException: 当客户端停掉后,canal服务端会报此异常 客户端:com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong with reason: something goes wrong with channel:[id: 0x01311037, /192.168.10.68:52086 => /192.168.10.68:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first 当服务端停掉或者重启中,客户端连不上就会抛出此异常。场景修改了服务点的配置文件此时服务端会重启,客户端就会报次异常

Message 数据格式和解析

canal:Message数据格式和解析

Reference